{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# N-Way merge with remote data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "How to merge multiple *sorted* remote data streams using the `heapq.merge` function that ships with Python."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from __future__ import print_function\n",
    "import heapq\n",
    "\n",
    "from IPython.display import display\n",
    "from IPython import parallel\n",
    "\n",
    "rc = parallel.Client()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Imagine we have some routine that is capable of loading/creating a sorted subset of our data in an engine, based on a parameter (such as the indes of which part of the data to read):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def load_data(arg):\n",
    "    \"\"\"Load a dataset in the global namespace. The dataset *must* be sorted.\n",
    "\n",
    "    Return the *name* of the variable in which the dataset was loaded.\"\"\"\n",
    "    global data\n",
    "    # Here, real data loading would occur\n",
    "    s =  4-arg\n",
    "    step = arg+1\n",
    "    data = range(s, s+4*step**2, step)\n",
    "    return 'data'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Exercise"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We want a function that takes a given single-engine View and a variable name,\n",
    "and returns a local iterator on the remote object.\n",
    "It should look something like this skeleton function:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def remote_iterator(view, name):\n",
    "    \"\"\"Return an iterator on an object living on a remote engine.\"\"\"\n",
    "    # TODO: create an iterator remotely\n",
    "    while True:\n",
    "        pass\n",
    "        # TODO: yield the next item\n",
    "        # TODO: turn remote StopIteration into local StopIteration"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Relevant Aside:\n",
    "\n",
    "Errors raised on engines will show up in the Client as a RemoteError.\n",
    "This means you have to be a little careful when trying to catch remote errors:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "try:\n",
    "    rc[-1].execute(\"foo = barbarbar\", block=True)\n",
    "except NameError:\n",
    "    print(\"caught NameError\")\n",
    "except Exception as e:\n",
    "    print(\"Oops! Didn't catch %r\" % e)\n",
    "    raise e\n",
    "print(\"safe and sound\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A RemoteError has three attributes:\n",
    "\n",
    "* `err.ename` - the class name of the remote error (e.g. `NameError`, `ValueError`)\n",
    "* `err.evalue` - the string value of the error message\n",
    "* `err.traceback` - the remote traceback as a list of strings"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For simple builtin exceptions,\n",
    "you can re-raise remote errors as the original exception class with a case like the following:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def assign_foo():\n",
    "    try:\n",
    "        rc[-1].execute(\"foo = barbarbar\", block=True)\n",
    "    except parallel.RemoteError as e:\n",
    "        if e.ename == 'NameError':\n",
    "            raise NameError(e.evalue)\n",
    "        else:\n",
    "            raise e"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By doing this re-cast, any exception handling outside will handle remote exceptions as if they were local."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "try:\n",
    "    assign_foo()\n",
    "except NameError:\n",
    "    print(\"caught NameError\")\n",
    "except Exception as e:\n",
    "    print(\"Oops! Didn't catch %r\" % e)\n",
    "    raise e\n",
    "print(\"safe and sound\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Can you fill out this remote_iterator function?\n",
    "\n",
    "Potentially useful:\n",
    "\n",
    "* catching RemoteErrors\n",
    "* parallel.Reference\n",
    "* yield"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def remote_iterator(view, name):\n",
    "    \"\"\"Return an iterator on an object living on a remote engine.\"\"\"\n",
    "    # TODO: create an iterator remotely\n",
    "    while True:\n",
    "        pass\n",
    "        # TODO: yield the next item\n",
    "        # TODO: turn remote StopIteration into local StopIteration"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A local example that should be a good guideline for the remote version:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%load soln/remote_iter_hint.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And the solution:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%load soln/remote_iter.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And an ever-so-slightly fancier solution:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%load soln/remote_iter_slightly_better.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, we bring `IPython.parallel` into action:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "dview = rc.direct_view()\n",
    "print('Engine IDs:', rc.ids)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Load the data on the engines\n",
    "data_refs = dview.map(load_data, rc.ids)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "data_refs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "list(data_refs)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# And we now make a local object which represents the remote iterator\n",
    "iterators = [remote_iterator(rc[e], ref) for e,ref in zip(rc.ids, data_refs)]\n",
    "for it in iterators:\n",
    "    print(list(it))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, let's merge those datasets into a single sorted one:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "print('Locally merge the remote sets:')\n",
    "iterators = [remote_iterator(rc[e], ref) for e,ref in zip(rc.ids, data_refs)] \n",
    "remote = list(heapq.merge(*iterators))\n",
    "print(remote)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Validation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "repeat the operation by copying the data from the engines to our local namespace and doing a regular merge here:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Key step here: pull data from each engine:\n",
    "local_data = [rc[e][ref] for e,ref in zip(rc.ids, data_refs)]\n",
    "print('Local data:')\n",
    "for subset in local_data:\n",
    "    print(subset)\n",
    "print('Sorted:')\n",
    "local = list(heapq.merge(*local_data))\n",
    "print(local)\n",
    "print(\"local == remote: %s\" % (local==remote))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {},
 "nbformat": 4,
 "nbformat_minor": 0
}